前后分离

扩展quartz分布式定时任务

2019-05-21  本文已影响0人  梦想实现家_Z

常规使用quartz需要每一个任务都实现org.quartz.Job接口,每次添加,删除或者修改任务配置都需要停机修改代码再重新部署,即使quartz提供API可以运行时删除或修改,但是机器重启后,只要代码不修改,还是会恢复成初始的配置。
所以针对以上的问题,最好是能做成动态可配置的才能更方便地管理我们的定时任务。最好的结果是可以直接使用spring管理的任意Bean对象,通过Bean的名称,指定的方法名,就能按照一定的调度策略执行定时任务
github地址:https://github.com/zw201913/quartz-cluster,里面有详细使用方式。

1.先把quartz需要的数据表创建好

-- ----------------------------
-- Table structure for QRTZ_JOB_DETAILS
-- ----------------------------
DROP TABLE IF EXISTS `QRTZ_JOB_DETAILS`;
CREATE TABLE `QRTZ_JOB_DETAILS` (
  `SCHED_NAME` varchar(120) COLLATE utf8mb4_bin NOT NULL,
  `JOB_NAME` varchar(200) COLLATE utf8mb4_bin NOT NULL,
  `JOB_GROUP` varchar(200) COLLATE utf8mb4_bin NOT NULL,
  `DESCRIPTION` varchar(250) COLLATE utf8mb4_bin DEFAULT NULL,
  `JOB_CLASS_NAME` varchar(250) COLLATE utf8mb4_bin NOT NULL,
  `IS_DURABLE` varchar(1) COLLATE utf8mb4_bin NOT NULL,
  `IS_NONCONCURRENT` varchar(1) COLLATE utf8mb4_bin NOT NULL,
  `IS_UPDATE_DATA` varchar(1) COLLATE utf8mb4_bin NOT NULL,
  `REQUESTS_RECOVERY` varchar(1) COLLATE utf8mb4_bin NOT NULL,
  `JOB_DATA` blob,
  PRIMARY KEY (`SCHED_NAME`,`JOB_NAME`,`JOB_GROUP`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;

-- ----------------------------
-- Table structure for QRTZ_TRIGGERS
-- ----------------------------
DROP TABLE IF EXISTS `QRTZ_TRIGGERS`;
CREATE TABLE `QRTZ_TRIGGERS` (
  `SCHED_NAME` varchar(120) COLLATE utf8mb4_bin NOT NULL,
  `TRIGGER_NAME` varchar(200) COLLATE utf8mb4_bin NOT NULL,
  `TRIGGER_GROUP` varchar(200) COLLATE utf8mb4_bin NOT NULL,
  `JOB_NAME` varchar(200) COLLATE utf8mb4_bin NOT NULL,
  `JOB_GROUP` varchar(200) COLLATE utf8mb4_bin NOT NULL,
  `DESCRIPTION` varchar(250) COLLATE utf8mb4_bin DEFAULT NULL,
  `NEXT_FIRE_TIME` bigint(13) DEFAULT NULL,
  `PREV_FIRE_TIME` bigint(13) DEFAULT NULL,
  `PRIORITY` int(11) DEFAULT NULL,
  `TRIGGER_STATE` varchar(16) COLLATE utf8mb4_bin NOT NULL,
  `TRIGGER_TYPE` varchar(8) COLLATE utf8mb4_bin NOT NULL,
  `START_TIME` bigint(13) NOT NULL,
  `END_TIME` bigint(13) DEFAULT NULL,
  `CALENDAR_NAME` varchar(200) COLLATE utf8mb4_bin DEFAULT NULL,
  `MISFIRE_INSTR` smallint(2) DEFAULT NULL,
  `JOB_DATA` blob,
  PRIMARY KEY (`SCHED_NAME`,`TRIGGER_NAME`,`TRIGGER_GROUP`),
  KEY `SCHED_NAME` (`SCHED_NAME`,`JOB_NAME`,`JOB_GROUP`),
  CONSTRAINT `QRTZ_TRIGGERS_ibfk_1` FOREIGN KEY (`SCHED_NAME`, `JOB_NAME`, `JOB_GROUP`) REFERENCES `QRTZ_JOB_DETAILS` (`SCHED_NAME`, `JOB_NAME`, `JOB_GROUP`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;

DROP TABLE IF EXISTS `QRTZ_BLOB_TRIGGERS`;
CREATE TABLE `QRTZ_BLOB_TRIGGERS` (
  `SCHED_NAME` varchar(120) COLLATE utf8mb4_bin NOT NULL,
  `TRIGGER_NAME` varchar(200) COLLATE utf8mb4_bin NOT NULL,
  `TRIGGER_GROUP` varchar(200) COLLATE utf8mb4_bin NOT NULL,
  `BLOB_DATA` blob,
  PRIMARY KEY (`SCHED_NAME`,`TRIGGER_NAME`,`TRIGGER_GROUP`),
  CONSTRAINT `QRTZ_BLOB_TRIGGERS_ibfk_1` FOREIGN KEY (`SCHED_NAME`, `TRIGGER_NAME`, `TRIGGER_GROUP`) REFERENCES `QRTZ_TRIGGERS` (`SCHED_NAME`, `TRIGGER_NAME`, `TRIGGER_GROUP`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;

-- ----------------------------
-- Table structure for QRTZ_CALENDARS
-- ----------------------------
DROP TABLE IF EXISTS `QRTZ_CALENDARS`;
CREATE TABLE `QRTZ_CALENDARS` (
  `SCHED_NAME` varchar(120) COLLATE utf8mb4_bin NOT NULL,
  `CALENDAR_NAME` varchar(200) COLLATE utf8mb4_bin NOT NULL,
  `CALENDAR` blob NOT NULL,
  PRIMARY KEY (`SCHED_NAME`,`CALENDAR_NAME`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;

-- ----------------------------
-- Table structure for QRTZ_CRON_TRIGGERS
-- ----------------------------
DROP TABLE IF EXISTS `QRTZ_CRON_TRIGGERS`;
CREATE TABLE `QRTZ_CRON_TRIGGERS` (
  `SCHED_NAME` varchar(120) COLLATE utf8mb4_bin NOT NULL,
  `TRIGGER_NAME` varchar(200) COLLATE utf8mb4_bin NOT NULL,
  `TRIGGER_GROUP` varchar(200) COLLATE utf8mb4_bin NOT NULL,
  `CRON_EXPRESSION` varchar(200) COLLATE utf8mb4_bin NOT NULL,
  `TIME_ZONE_ID` varchar(80) COLLATE utf8mb4_bin DEFAULT NULL,
  PRIMARY KEY (`SCHED_NAME`,`TRIGGER_NAME`,`TRIGGER_GROUP`),
  CONSTRAINT `QRTZ_CRON_TRIGGERS_ibfk_1` FOREIGN KEY (`SCHED_NAME`, `TRIGGER_NAME`, `TRIGGER_GROUP`) REFERENCES `QRTZ_TRIGGERS` (`SCHED_NAME`, `TRIGGER_NAME`, `TRIGGER_GROUP`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;

-- ----------------------------
-- Table structure for QRTZ_FIRED_TRIGGERS
-- ----------------------------
DROP TABLE IF EXISTS `QRTZ_FIRED_TRIGGERS`;
CREATE TABLE `QRTZ_FIRED_TRIGGERS` (
  `SCHED_NAME` varchar(120) COLLATE utf8mb4_bin NOT NULL,
  `ENTRY_ID` varchar(95) COLLATE utf8mb4_bin NOT NULL,
  `TRIGGER_NAME` varchar(200) COLLATE utf8mb4_bin NOT NULL,
  `TRIGGER_GROUP` varchar(200) COLLATE utf8mb4_bin NOT NULL,
  `INSTANCE_NAME` varchar(200) COLLATE utf8mb4_bin NOT NULL,
  `FIRED_TIME` bigint(13) NOT NULL,
  `SCHED_TIME` bigint(13) NOT NULL,
  `PRIORITY` int(11) NOT NULL,
  `STATE` varchar(16) COLLATE utf8mb4_bin NOT NULL,
  `JOB_NAME` varchar(200) COLLATE utf8mb4_bin DEFAULT NULL,
  `JOB_GROUP` varchar(200) COLLATE utf8mb4_bin DEFAULT NULL,
  `IS_NONCONCURRENT` varchar(1) COLLATE utf8mb4_bin DEFAULT NULL,
  `REQUESTS_RECOVERY` varchar(1) COLLATE utf8mb4_bin DEFAULT NULL,
  PRIMARY KEY (`SCHED_NAME`,`ENTRY_ID`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;


-- ----------------------------
-- Table structure for QRTZ_LOCKS
-- ----------------------------
DROP TABLE IF EXISTS `QRTZ_LOCKS`;
CREATE TABLE `QRTZ_LOCKS` (
  `SCHED_NAME` varchar(120) COLLATE utf8mb4_bin NOT NULL,
  `LOCK_NAME` varchar(40) COLLATE utf8mb4_bin NOT NULL,
  PRIMARY KEY (`SCHED_NAME`,`LOCK_NAME`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;

-- ----------------------------
-- Table structure for QRTZ_PAUSED_TRIGGER_GRPS
-- ----------------------------
DROP TABLE IF EXISTS `QRTZ_PAUSED_TRIGGER_GRPS`;
CREATE TABLE `QRTZ_PAUSED_TRIGGER_GRPS` (
  `SCHED_NAME` varchar(120) COLLATE utf8mb4_bin NOT NULL,
  `TRIGGER_GROUP` varchar(200) COLLATE utf8mb4_bin NOT NULL,
  PRIMARY KEY (`SCHED_NAME`,`TRIGGER_GROUP`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;

-- ----------------------------
-- Table structure for QRTZ_SCHEDULER_STATE
-- ----------------------------
DROP TABLE IF EXISTS `QRTZ_SCHEDULER_STATE`;
CREATE TABLE `QRTZ_SCHEDULER_STATE` (
  `SCHED_NAME` varchar(120) COLLATE utf8mb4_bin NOT NULL,
  `INSTANCE_NAME` varchar(200) COLLATE utf8mb4_bin NOT NULL,
  `LAST_CHECKIN_TIME` bigint(13) NOT NULL,
  `CHECKIN_INTERVAL` bigint(13) NOT NULL,
  PRIMARY KEY (`SCHED_NAME`,`INSTANCE_NAME`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;

-- ----------------------------
-- Table structure for QRTZ_SIMPLE_TRIGGERS
-- ----------------------------
DROP TABLE IF EXISTS `QRTZ_SIMPLE_TRIGGERS`;
CREATE TABLE `QRTZ_SIMPLE_TRIGGERS` (
  `SCHED_NAME` varchar(120) COLLATE utf8mb4_bin NOT NULL,
  `TRIGGER_NAME` varchar(200) COLLATE utf8mb4_bin NOT NULL,
  `TRIGGER_GROUP` varchar(200) COLLATE utf8mb4_bin NOT NULL,
  `REPEAT_COUNT` bigint(7) NOT NULL,
  `REPEAT_INTERVAL` bigint(12) NOT NULL,
  `TIMES_TRIGGERED` bigint(10) NOT NULL,
  PRIMARY KEY (`SCHED_NAME`,`TRIGGER_NAME`,`TRIGGER_GROUP`),
  CONSTRAINT `QRTZ_SIMPLE_TRIGGERS_ibfk_1` FOREIGN KEY (`SCHED_NAME`, `TRIGGER_NAME`, `TRIGGER_GROUP`) REFERENCES `QRTZ_TRIGGERS` (`SCHED_NAME`, `TRIGGER_NAME`, `TRIGGER_GROUP`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;

-- ----------------------------
-- Table structure for QRTZ_SIMPROP_TRIGGERS
-- ----------------------------
DROP TABLE IF EXISTS `QRTZ_SIMPROP_TRIGGERS`;
CREATE TABLE `QRTZ_SIMPROP_TRIGGERS` (
  `SCHED_NAME` varchar(120) COLLATE utf8mb4_bin NOT NULL,
  `TRIGGER_NAME` varchar(200) COLLATE utf8mb4_bin NOT NULL,
  `TRIGGER_GROUP` varchar(200) COLLATE utf8mb4_bin NOT NULL,
  `STR_PROP_1` varchar(512) COLLATE utf8mb4_bin DEFAULT NULL,
  `STR_PROP_2` varchar(512) COLLATE utf8mb4_bin DEFAULT NULL,
  `STR_PROP_3` varchar(512) COLLATE utf8mb4_bin DEFAULT NULL,
  `INT_PROP_1` int(11) DEFAULT NULL,
  `INT_PROP_2` int(11) DEFAULT NULL,
  `LONG_PROP_1` bigint(20) DEFAULT NULL,
  `LONG_PROP_2` bigint(20) DEFAULT NULL,
  `DEC_PROP_1` decimal(13,4) DEFAULT NULL,
  `DEC_PROP_2` decimal(13,4) DEFAULT NULL,
  `BOOL_PROP_1` varchar(1) COLLATE utf8mb4_bin DEFAULT NULL,
  `BOOL_PROP_2` varchar(1) COLLATE utf8mb4_bin DEFAULT NULL,
  PRIMARY KEY (`SCHED_NAME`,`TRIGGER_NAME`,`TRIGGER_GROUP`),
  CONSTRAINT `QRTZ_SIMPROP_TRIGGERS_ibfk_1` FOREIGN KEY (`SCHED_NAME`, `TRIGGER_NAME`, `TRIGGER_GROUP`) REFERENCES `QRTZ_TRIGGERS` (`SCHED_NAME`, `TRIGGER_NAME`, `TRIGGER_GROUP`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;

2.依赖

implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation "org.springframework.boot:spring-boot-starter-quartz"
    compileOnly 'org.projectlombok:lombok'
    implementation 'mysql:mysql-connector-java'
    annotationProcessor 'org.projectlombok:lombok's
    compile 'org.apache.commons:commons-lang3:3.9'
    compile 'com.google.guava:guava:27.1-jre'

3.因为使用的时候我打算通过一个自定义注解就搞定它,类似下面这样:

import com.github.quartzcluster.annotation.EnableQuartzCluster;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@EnableQuartzCluster
//@EnableQuartzCluster("classpath:文件名")
//@EnableQuartzCluster("配置文件绝对路径")
@SpringBootApplication
public class QuartzClusterApplication {

    public static void main(String[] args) {
        SpringApplication.run(QuartzClusterApplication.class, args);
    }
}

所以我们需要开始自定义一个EnableQuartzCluster注解

import com.github.quartzcluster.config.DynamicSchedulingConfigurationSelector;
import org.springframework.context.annotation.Import;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(DynamicSchedulingConfigurationSelector.class)
public @interface EnableQuartzCluster {

    String CLASSPATH_PREFIX = "classpath:";

    String DEFAULT_PROPERTIES = "quartz.properties";

    String value() default CLASSPATH_PREFIX + DEFAULT_PROPERTIES;
}

这个自定义注解有两个作用:
1.注入一些quartz必要的Bean对象
2.允许使用者指定配置文件

import com.github.quartzcluster.annotation.EnableQuartzCluster;
import com.google.common.collect.Maps;
import org.springframework.context.annotation.ImportSelector;
import org.springframework.core.type.AnnotationMetadata;
import org.springframework.util.CollectionUtils;

import java.util.Map;

public class DynamicSchedulingConfigurationSelector implements ImportSelector {

    public static final Map<String, Object> annotationAttrs = Maps.newConcurrentMap();

    @Override
    public String[] selectImports(AnnotationMetadata importingClassMetadata) {
        /** 获取指定的配置文件 */
        Map<String, Object> map =
                importingClassMetadata.getAnnotationAttributes(EnableQuartzCluster.class.getName());
        if (!CollectionUtils.isEmpty(map)) {
            map.forEach((key, value) -> annotationAttrs.put(key, value));
        }
        return new String[] {
            ApplicationContextUtil.class.getName(),
            DynamicSchedulingConfiguration.class.getName(),
            SchedulerConfiguration.class.getName()
        };
    }

    public static Object get(String key) {
        return annotationAttrs.get(key);
    }
}

DynamicSchedulingConfigurationSelector类指定类ApplicationContextUtil,DynamicSchedulingConfiguration,SchedulerConfiguration纳入spring管理,annotationAttrs存储了指定的配置文件名称。

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;

public class ApplicationContextUtil implements ApplicationContextAware {

    private static ApplicationContext appContext;

    /**
     * 获取Bean
     *
     * @param name
     * @return
     */
    public static Object getBean(String name) {
        return appContext.getBean(name);
    }

    /**
     * 获取bean
     *
     * @param clazz
     * @param <T>
     * @return
     */
    public static <T> T getBean(Class<T> clazz) {
        return appContext.getBean(clazz);
    }

    /**
     * @param className
     * @return
     */
    public static Class<?> getType(String className) {
        return appContext.getType(className);
    }

    /**
     * @param name
     * @return
     */
    public static String getProperty(String name) {
        return appContext.getEnvironment().getProperty(name);
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        appContext = applicationContext;
    }
}

ApplicationContextUtil实现了ApplicationContextAware接口,获取ApplicationContext上下文后可以通过静态方法获取任意指定Bean对象。如果我们能通过ApplicationContextUtil获取任意指定的Bean对象的话,那就能实现我们最终的目标:通过任意指定的Bean对象,按照一定调度策略,通过反射执行指定的方法

import com.github.quartzcluster.annotation.EnableQuartzCluster;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.Resource;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;

import javax.sql.DataSource;
import java.util.Objects;

public class DynamicSchedulingConfiguration {

    /** 获取spring中的数据源 */
    @Autowired private DataSource dataSource;

    @Bean(name = "schedulerFactoryBean", destroyMethod = "destroy")
    public SchedulerFactoryBean schedulerFactoryBean() {
        SchedulerFactoryBean factory = new SchedulerFactoryBean();
        // 设置数据源
        factory.setDataSource(dataSource);
        // 设置配置文件
        factory.setConfigLocation(propertiesResource());
        // 启动是否自动执行
        factory.setAutoStartup(true);
        // 是否覆盖已有的job
        factory.setOverwriteExistingJobs(true);
        // 项目启动5秒后开始执行定时任务
        factory.setStartupDelay(5);
        return factory;
    }

    /**
     * 获取配置文件
     *
     * @return
     */
    private Resource propertiesResource() {
        String value = (String) DynamicSchedulingConfigurationSelector.get("value");
        if (Objects.isNull(value)) {
            return new ClassPathResource(EnableQuartzCluster.DEFAULT_PROPERTIES);
        }
        if (value.indexOf(EnableQuartzCluster.CLASSPATH_PREFIX) == 0) {
            return new ClassPathResource(value.replace(EnableQuartzCluster.CLASSPATH_PREFIX, ""));
        }
        return new FileSystemResource(value);
    }
}

上面的类就是把SchedulerFactoryBean作为一个Bean注入进spring容器,是为了下一步创建Scheduler对象做准备。

import org.quartz.Scheduler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;

public class SchedulerConfiguration {

    @Bean(name = "scheduler", initMethod = "start", destroyMethod = "shutdown")
    public Scheduler scheduler(@Autowired SchedulerFactoryBean schedulerFactoryBean) {
        Scheduler scheduler = schedulerFactoryBean.getScheduler();
        return scheduler;
    }
}

以上代码我们就把所有事先需要的Bean都创建好了。还有一个配置文件:quartz.properties

#============================================================================
# Configure JobStore
# Using Spring datasource in SchedulerConfig.java
# Spring uses LocalDataSourceJobStore extension of JobStoreCMT
#============================================================================
org.quartz.jobStore.useProperties=false
org.quartz.jobStore.tablePrefix = QRTZ_
org.quartz.jobStore.isClustered = true
org.quartz.jobStore.clusterCheckinInterval = 5000
org.quartz.jobStore.misfireThreshold = 60000
org.quartz.jobStore.txIsolationLevelReadCommitted = true
org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate

#============================================================================
# Configure Main Scheduler Properties
# Needed to manage cluster instances
#============================================================================
org.quartz.scheduler.instanceName = ClusterQuartz
org.quartz.scheduler.instanceId= AUTO
org.quartz.scheduler.rmi.export = false
org.quartz.scheduler.rmi.proxy = false
org.quartz.scheduler.wrapJobExecutionInUserTransaction = false

#============================================================================
# Configure ThreadPool
# Can also be configured in spring configuration
#============================================================================
#org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool
#org.quartz.threadPool.threadCount = 5
#org.quartz.threadPool.threadPriority = 5
#org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread = true

下面我们开始使用上面已经创建好的Bean:

import com.github.quartzcluster.support.CronJobDefinition;
import com.github.quartzcluster.support.Key;
import com.github.quartzcluster.support.SimpleJobDefinition;

public interface IScheduleService {
    /** @param cronJobDefinition */
    void schedule(CronJobDefinition cronJobDefinition);

    /** @param simpleJobDefinition */
    void schedule(SimpleJobDefinition simpleJobDefinition);

    /**
     * 暂停触发器
     *
     * @param key
     */
    void pauseTrigger(Key key);

    /**
     * 恢复触发器
     *
     * @param key
     */
    void resumeTrigger(Key key);

    /**
     * 删除触发器
     *
     * @param key
     * @return
     */
    boolean removeTrigger(Key key);
}
import com.github.quartzcluster.service.IScheduleService;
import com.github.quartzcluster.support.CronJobDefinition;
import com.github.quartzcluster.support.Key;
import com.github.quartzcluster.support.SimpleJobDefinition;
import org.quartz.Scheduler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class ScheduleServiceImpl implements IScheduleService {

    @Autowired
    private Scheduler scheduler;

    @Override
    public void schedule(CronJobDefinition cronJobDefinition) {
        cronJobDefinition.scheduleJob(scheduler);
    }

    @Override
    public void schedule(SimpleJobDefinition simpleJobDefinition) {
        simpleJobDefinition.scheduleJob(scheduler);
    }

    /**
     * 暂停触发器
     * @param key
     */
    @Override
    public void pauseTrigger(Key key) {
        key.pauseTrigger(scheduler);
    }

    /**
     * 恢复触发器
     * @param key
     */
    @Override
    public void resumeTrigger(Key key) {
        key.resumeTrigger(scheduler);
    }

    /**
     * 移除触发器
     * @param key
     * @return
     */
    @Override
    public boolean removeTrigger(Key key) {
        return key.removeTrigger(scheduler);
    }
}

import com.github.quartzcluster.service.IScheduleService;
import com.github.quartzcluster.support.CronJobDefinition;
import com.github.quartzcluster.support.Key;
import com.github.quartzcluster.support.SimpleJobDefinition;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

@RequestMapping("/job")
@RestController
public class JobScheduleController {

    @Autowired private IScheduleService scheduleService;

    /**
     * 添加cron任务
     *
     * @param cronJobDefinition
     * @return
     */
    @PostMapping("/addCronJob")
    public void add(@RequestPart("cronJobDefinition") CronJobDefinition cronJobDefinition) {
        scheduleService.schedule(cronJobDefinition);
    }

    /**
     * 添加简单任务
     *
     * @param simpleJobDefinition
     */
    @PostMapping("/addSimpleJob")
    public void add(@RequestPart("simpleJobDefinition") SimpleJobDefinition simpleJobDefinition) {
        scheduleService.schedule(simpleJobDefinition);
    }

    /**
     * 删除触发器
     *
     * @param key
     * @return
     */
    @DeleteMapping("/removeTrigger")
    public boolean removeTrigger(@RequestPart("key") Key key) {
        return scheduleService.removeTrigger(key);
    }

    /**
     * 暂停触发器
     *
     * @param key
     * @return
     */
    @PutMapping("/pauseTrigger")
    public void pauseTrigger(@RequestPart("key") Key key) {
        scheduleService.pauseTrigger(key);
    }

    /**
     * 恢复触发器
     *
     * @param key
     */
    @PutMapping("/resumeTrigger")
    public void resumeTrigger(@RequestPart("key") Key key) {
        scheduleService.resumeTrigger(key);
    }
}
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.quartz.CronScheduleBuilder;
import org.quartz.JobDetail;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;

@Slf4j
@Data
public class CronJobDefinition extends JobDefinition {

    private static final long serialVersionUID = 6940446397330926681L;

    /** 执行策略 */
    private String cronExpression;

    @Override
    protected Trigger trigger(JobDetail jobDetail) {
        return TriggerBuilder.newTrigger()
                .forJob(jobDetail)
                .withIdentity(triggerKey())
                .withSchedule(CronScheduleBuilder.cronSchedule(getCronExpression()))
                .build();
    }
}

import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.quartz.JobDetail;
import org.quartz.SimpleScheduleBuilder;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;

import java.util.Date;

@Slf4j
@Data
public class SimpleJobDefinition extends JobDefinition {

    private static final long serialVersionUID = 6940446397330926681L;

    /** 开始时间 */
    private long startTime;
    /** 结束时间 */
    private long endTime;
    /** 重复次数 */
    private int repeatCount;

    /** 时间间隔(单位:秒) */
    private int repeatIntervalInSeconds;

    @Override
    protected Trigger trigger(JobDetail jobDetail) {
        return TriggerBuilder.newTrigger()
                .forJob(jobDetail)
                .withIdentity(triggerKey())
                .withSchedule(
                        SimpleScheduleBuilder.simpleSchedule()
                                .withRepeatCount(getRepeatCount())
                                .withIntervalInSeconds(getRepeatIntervalInSeconds()))
                .startAt(new Date(getStartTime()))
                .endAt(new Date(getEndTime()))
                .build();
    }
}

import com.github.quartzcluster.config.Const;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.quartz.*;

import java.io.Serializable;

@Slf4j
@Data
public abstract class JobDefinition extends Key implements Serializable {

    private static final long serialVersionUID = -8355775738649736514L;
    /** 任务ID(要求全局唯一,包括集群范围内) */
    private String jobId;
    /** 任务描述 */
    private String description;
    /** 假设时间间隔很短,上一次任务还没执行完毕,是否并行执行这次任务 */
    private boolean isConcurrent;
    /** 类名 */
    private String className;
    /** spring容器中的bean名称 */
    private String springId;
    /** 方法名称 */
    private String methodName;
    /** 方法参数 */
    private String methodArg;
    /** 创建时间 */
    private long createTime;
    /** 更新时间 */
    private long updateTime;

    /**
     * 添加或修改
     *
     * @param scheduler
     */
    public void scheduleJob(Scheduler scheduler) {
        TriggerKey triggerKey = triggerKey();
        JobDetail jobDetail = jobDetail();
        Trigger trigger = trigger(jobDetail);
        try {
            scheduler.addJob(jobDetail, true, true);
            if (scheduler.checkExists(triggerKey)) {
                scheduler.rescheduleJob(triggerKey, trigger);
            } else {
                scheduler.scheduleJob(trigger);
            }
        } catch (SchedulerException e) {
            throw new IllegalArgumentException(e);
        }
    }

    /**
     * 创建Trigger
     *
     * @param jobDetail
     * @return
     */
    protected abstract Trigger trigger(JobDetail jobDetail);

    /**
     * 创建jobDetail
     *
     * @return
     */
    private JobDetail jobDetail() {
        JobKey jobKey = new JobKey(getName(), getGroup());
        JobDetail jobDetail =
                JobBuilder.newJob(
                                isConcurrent()
                                        ? ConcurrentQuartzJob.class
                                        : DisallowConcurrentQuartzJob.class)
                        .withIdentity(jobKey)
                        .withDescription(getDescription())
                        .build();
        jobDetail.getJobDataMap().put(Const.JOB_DATA_KEY, this);
        return jobDetail;
    }
}

import lombok.Data;
import org.apache.commons.lang3.StringUtils;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.TriggerKey;

@Data
public class Key {

    protected static final String DEFAULT_GROUP = "default_group";

    /** 任务名称 */
    private String name;
    /** 任务分组 */
    private String group;

    /**
     * 暂停触发器
     *
     * @param scheduler
     */
    public void pauseTrigger(Scheduler scheduler) {
        TriggerKey triggerKey = triggerKey();
        try {
            scheduler.pauseTrigger(triggerKey);
        } catch (SchedulerException e) {
            throw new IllegalArgumentException(e);
        }
    }

    /**
     * 恢复触发器
     *
     * @param scheduler
     */
    public void resumeTrigger(Scheduler scheduler) {
        TriggerKey triggerKey = triggerKey();
        try {
            scheduler.resumeTrigger(triggerKey);
        } catch (SchedulerException e) {
            throw new IllegalArgumentException(e);
        }
    }

    /**
     * 删除触发器
     *
     * @param scheduler
     */
    public boolean removeTrigger(Scheduler scheduler) {
        TriggerKey triggerKey = triggerKey();
        try {
            scheduler.pauseTrigger(triggerKey);
            return scheduler.unscheduleJob(triggerKey);
        } catch (SchedulerException e) {
            throw new IllegalArgumentException(e);
        }
    }

    protected TriggerKey triggerKey() {
        return new TriggerKey(getName(), getGroup());
    }

    /**
     * 获取group
     *
     * @return
     */
    public String getGroup() {
        return StringUtils.isBlank(group) ? DEFAULT_GROUP : group;
    }
}

我们的使用方式已经定义完毕,但是我们还需要自定义两个Job:

import com.github.quartzcluster.config.Const;
import com.github.quartzcluster.core.JobActuator;
import org.quartz.JobExecutionContext;
import org.quartz.PersistJobDataAfterExecution;
import org.springframework.scheduling.quartz.QuartzJobBean;

@PersistJobDataAfterExecution
public class ConcurrentQuartzJob extends QuartzJobBean {
    @Override
    protected void executeInternal(JobExecutionContext context) {
        JobDefinition task = (JobDefinition) context.getMergedJobDataMap().get(Const.JOB_DATA_KEY);
        JobActuator.invoke(task);
    }
}

import org.quartz.DisallowConcurrentExecution;
import org.quartz.PersistJobDataAfterExecution;

@PersistJobDataAfterExecution
@DisallowConcurrentExecution
public class DisallowConcurrentQuartzJob extends ConcurrentQuartzJob {}

真正执行反射的代码就是JobActuator类:

import com.github.quartzcluster.config.ApplicationContextUtil;
import com.github.quartzcluster.support.JobDefinition;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Objects;

@Slf4j
public class JobActuator {
    public static void invoke(JobDefinition jobDefinition) {
        String jobId = jobDefinition.getJobId();
        String springId = jobDefinition.getSpringId();
        String methodName = jobDefinition.getMethodName();
        String methodArgs = jobDefinition.getMethodArg();
        try {
            invoke(springId, methodName, methodArgs);
        } catch (NoSuchMethodException e) {
            log.error("无效的methodName:" + methodName, e);
        } catch (Exception e) {
            log.error("执行定时任务失败", e);
            log.error("id:{},  springId:{}, methodName:{}", jobId, springId, methodName);
        }
    }

    /**
     * 执行定时任务
     *
     * @param springId
     * @param methodName
     * @param methodArgs
     * @throws InvocationTargetException
     * @throws IllegalAccessException
     * @throws NoSuchMethodException
     */
    private static void invoke(String springId, String methodName, String methodArgs)
            throws InvocationTargetException, IllegalAccessException, NoSuchMethodException {
        Object object = ApplicationContextUtil.getBean(springId);
        if (Objects.isNull(object)) {
            log.error("无效的springId:" + springId);
            return;
        }
        Class<?> clazz = object.getClass();
        Method method;
        if (StringUtils.isBlank(methodArgs)) {
            method = clazz.getDeclaredMethod(methodName);
        } else {
            method = clazz.getDeclaredMethod(methodName, new Class[] {String.class});
        }
        method.setAccessible(true);
        if (StringUtils.isBlank(methodArgs)) {
            method.invoke(object);
        } else {
            method.invoke(object, methodArgs);
        }
    }
}

以上代码就将quartz扩展成了完全动态化的定时任务组件。欢迎感兴趣的小伙伴参与讨论。

下面我们测试一下,先来一个目标类:

import org.springframework.stereotype.Service;

import java.text.SimpleDateFormat;
import java.util.Date;

@Service
public class TestService {

    public void test1() {
        System.out.println(
                "测试任务1" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()));
    }

    public void test2(String param) {
        System.out.println(
                param + ":" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()));
    }
}

简单执行一下测试用例:

// Bean名称:testService
// 方法名:test1
// 调度策略:0/30 * * * * ? 
curl -X "POST" "http://localhost:8080/job/addCronJob" \
     -H 'Content-Type: multipart/form-data; charset=utf-8; boundary=__X_PAW_BOUNDARY__' \
     -F "cronJobDefinition={\"cronExpression\":\"0/30 * * * * ? \",\"jobId\":\"1\",\"description\":\"测试定时任务1\",\"isConcurrent\":false,\"className\":\"com.github.quartzcluster.service.TestService\",\"springId\":\"testService\",\"methodName\":\"test1\",\"methodArg\":\"\",\"name\":\"name2\",\"group\":\"group1\",\"createTime\":1478422605000,\"updateTime\":1491305256000}"


// Bean名称:testService
// 方法名:test2
// 参数值:方法参数1
// 调度策略:0/10 * * * * ? 
curl -X "POST" "http://localhost:8080/job/addCronJob" \
     -H 'Content-Type: multipart/form-data; charset=utf-8; boundary=__X_PAW_BOUNDARY__' \
     -F "cronJobDefinition={\"cronExpression\":\"0/10 * * * * ? \",\"jobId\":\"1\",\"description\":\"测试定时任务2\",\"isConcurrent\":false,\"className\":\"com.github.quartzcluster.service.TestService\",\"springId\":\"testService\",\"methodName\":\"test2\",\"methodArg\":\"方法参数1\",\"name\":\"name1\",\"group\":\"group1\",\"createTime\":1478422605000,\"updateTime\":1491305256000}"


// Bean名称:testService
// 方法名:test2
// 参数值:方法参数2
// 启始时间:2019-05-21 16:00:00
// 结束时间:2100-05-21 18:00:00
// 执行1000次,每次间隔10秒
curl -X "POST" "http://localhost:8080/job/addSimpleJob" \
     -H 'Content-Type: multipart/form-data; charset=utf-8; boundary=__X_PAW_BOUNDARY__' \
     -F "simpleJobDefinition={\"jobId\":\"1\",\"description\":\"测试定时任务\",\"isConcurrent\":false,\"className\":\"com.github.quartzcluster.service.TestService\",\"springId\":\"testService\",\"methodName\":\"test2\",\"methodArg\":\"方法参数2\":\"name1\",\"group\":\"group2\",\"createTime\":1478422605000,\"updateTime\":1491305256000,\"startTime\":1558425577130,\"endTime\":4114576800000,\"repeatCount\":1000,\"repeatIntervalInSeconds\":10}"
上一篇下一篇

猜你喜欢

热点阅读