分布式系统(distributed system)

SpringBoot+Quartz实现动态定时任务

2024-05-10  本文已影响0人  小波同学

目前常用的几种任务调度

SpringBoot项目的实现

1.config配置

import org.quartz.Scheduler;
import org.quartz.ee.servlet.QuartzInitializerListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.PropertiesFactoryBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
  
import java.io.IOException;
import java.util.Properties;
  
/**
 * quartz配置
 */
@Configuration
public class SchedulerConfig {
     
    @Bean(name="SchedulerFactory")
    public SchedulerFactoryBean schedulerFactoryBean() throws IOException {
        SchedulerFactoryBean factory = new SchedulerFactoryBean();
        factory.setAutoStartup(true);
        //factory.setStartupDelay(5);//延时5秒启动
        return factory;
    }
  
    /*
     * quartz初始化监听器
     */
    @Bean
    public QuartzInitializerListener executorListener() {
       return new QuartzInitializerListener();
    }
  
    /*
     * 通过SchedulerFactoryBean获取Scheduler的实例
     */
    @Bean(name="Scheduler")
    public Scheduler scheduler() throws IOException {
        return schedulerFactoryBean().getScheduler();
    }
  
}

定时任务动态操作

import lombok.extern.slf4j.Slf4j;
import org.quartz.CronScheduleBuilder;
import org.quartz.CronTrigger;
import org.quartz.JobBuilder;
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.TriggerBuilder;
import org.quartz.TriggerKey;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.GetMapping;

import javax.annotation.Resource;

/**
 * @Author: huangyibo
 * @Date: 2024/5/11 14:29
 * @Description: 定时任务动态操作
 */

@Component
@Slf4j
public class TaskGroupHandler {

    @Resource
    private Scheduler scheduler;

    /**
     * 新增定时任务
     * @param jobId
     */
    public void addCronJob(String jobId) {
        try {
            TriggerKey triggerKey = TriggerKey.triggerKey(jobId, "TaskGroup");

            //构建job信息
            JobDetail job = JobBuilder.newJob(TaskGroupJob.class).withIdentity(jobId, "TaskGroup")
                    .withDescription("任务组编排").build();
            JobDataMap jobDataMap = job.getJobDataMap();
            jobDataMap.put("jobId", jobId);
            //CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule("cron的表达式");

            CronTrigger trigger = TriggerBuilder.newTrigger()
                    .withIdentity(triggerKey)
                    .startNow()
                    .withSchedule(CronScheduleBuilder.cronSchedule("10 * * * * ?").withMisfireHandlingInstructionFireAndProceed())
                    .build();

            //SimpleScheduleBuilder.simpleSchedule().withRepeatCount(0).withIntervalInSeconds(20)//每隔多少秒执行一次; withRepeatCount 设置重复的次数
            //.startNow().withSchedule(cronScheduleBuilder)
            //交由Scheduler安排触发
            scheduler.scheduleJob(job, trigger);
            if(!scheduler.isStarted()){
                scheduler.start();
            }
            log.info("添加定时任务成功, startJob:{}", jobId);
        } catch (SchedulerException e) {
            log.error("添加定时任务异常, jobId:{}", jobId, e);
            throw new RuntimeException(e);
        }
    }

    /**
     * 删除任务
     * @param jobId
     * @return
     */
    public void removeCronJob(String jobId) {
        try {
            // TriggerKey 定义了trigger的名称和组别 ,通过任务名和任务组名获取TriggerKey
            TriggerKey triggerKey = TriggerKey.triggerKey(jobId,"TaskGroup");
            // 停止触发器
            scheduler.resumeTrigger(triggerKey);
            // 移除触发器
            scheduler.unscheduleJob(triggerKey);
            // 移除任务
            scheduler.deleteJob(JobKey.jobKey(jobId,"TaskGroup"));
            log.info("删除定时任务成功, jobId:{}", jobId);
        } catch (SchedulerException e) {
            log.error("删除定时任务异常, jobId:{}", jobId, e);
        }
    }

    /**
     * 暂停定时任务
     * @param jobId
     */
    public void pauseJob(String jobId) {
        try {
            JobKey jobKey = JobKey.jobKey(jobId,"TaskGroup");
            // 暂停任务
            scheduler.pauseJob(jobKey);
            log.info("暂停定时任务成功, jobId:{}", jobId);
        } catch (SchedulerException e) {
            log.error("暂停定时任务异常, jobId:{}", jobId, e);
        }
    }

    /**
     * 继续定时任务
     * @param jobId
     */
    @GetMapping("/resumeJob")
    public void resumeJob(String jobId) {
        try {
            // 通过任务名和任务组名获取jobKey
            JobKey jobKey = JobKey.jobKey(jobId,"TaskGroup");
            // 继续任务
            scheduler.resumeJob(jobKey);
            log.info("继续定时任务成功, jobId:{}", jobId);
        } catch (SchedulerException e) {
            log.error("继续定时任务异常, jobId:{}", jobId, e);
        }
    }
}
@RestController
@RequestMapping("/api/test")
@Slf4j
public class TestController {

    @Resource
    private TaskGroupHandler taskGroupHandler;


    @GetMapping("/quartzAdd")
    public boolean quartzAdd(String jobId) {
        taskGroupHandler.addCronJob(jobId);
        log.info("添加定时任务成功, startJob:{}", jobId);
        return Boolean.TRUE;
    }

    /**
     * 删除任务
     *
     * @param jobId
     * @return
     */
    @GetMapping("/quartzDel")
    public boolean quartzDel(String jobId) {
        taskGroupHandler.removeCronJob(jobId);
        log.info("删除定时任务成功, startJob:{}", jobId);
        return Boolean.TRUE;
    }

    // 暂停定时任务
    @GetMapping("/pauseJob")
    public void pauseJob(String jobId) {
        taskGroupHandler.pauseJob(jobId);
        log.info("暂停定时任务成功, jobId:{}", jobId);
    }

    // 继续定时任务
    @GetMapping("/resumeJob")
    public void resumeJob(String jobId) {
        taskGroupHandler.resumeJob(jobId);
        log.info("继续定时任务成功, jobId:{}", jobId);
    }
}

任务预热,预先加载数据库已经配置好的任务

/**
 * @Author: huangyibo
 * @Date: 2024/5/10 18:26
 * @Description: 任务预热,预先加载数据库已经配置好的任务
 */
@Component
@Slf4j
public class TaskGroupJobRunner implements CommandLineRunner {

    @Resource
    private TaskGroupHandler taskGroupHandler;

    @Override
    public void run(String... args) {
        // 初始加载数据库里状态为正常的定时任务
        List<String> list = Stream.of("1", "2").collect(Collectors.toList());
        list.forEach(jobId -> {
            taskGroupHandler.addCronJob(jobId);
        });
    }
}

TaskGroupJob实现具体逻辑

@Component
@Slf4j
public class TaskGroupJob implements Job {

    DateTimeFormatter formatter= DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        JobDataMap jdMap = context.getJobDetail().getJobDataMap();
        String jobId = (String) jdMap.get("jobId");

        //TODO 逻辑待补充

        log.info(formatter.format(LocalDateTime.now()) + "----TaskGroupJob-计划执行开始===>jobId:{}", jobId);
    }

}

参考:
https://www.jb51.net/article/261554.htm

https://www.cnblogs.com/Alida/p/12986967.html

上一篇 下一篇

猜你喜欢

热点阅读